注册中心源码 EnableErurkaServer
注册 注册中心启动后,会生成一个注册接口为ApplicationResource#addInstance 方法为post
ApplicationResource#addInstance 注册接口1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"application/json", "application/xml"}) ({
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
if (this.isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (this.isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (this.isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (this.isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!this.appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
} else {
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
if (this.isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
}
if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
//PeerAwareInstanceRegistry注册方法
this.registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
}
PeerAwareInstanceRegistry#register()1
2
3
4
5
6
7
8
9
10
public void register(InstanceInfo info, boolean isReplication) {
int leaseDuration = 90;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//使用父类注册方法
super.register(info, leaseDuration, isReplication);
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
}
AbstractInstanceRegistry#register()
1 | public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { |
续约
InstanceResource 续约接口 method put1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Response renewLease(@HeaderParam("x-netflix-discovery-replication") String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = this.registry.renew(this.app.getName(), this.id, isFromReplicaNode);
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", this.app.getName(), this.id);
return Response.status(Status.NOT_FOUND).build();
} else {
Response response;
if (lastDirtyTimestamp != null && this.serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) {
this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", new Object[]{this.app.getName(), this.id, response.getStatus()});
return response;
}
}
通过以上源码得知注册中心本质上是一个双层的 ConcurrentHashMap,存储在内存中的。
第一层的 key 是spring.application.name,value 是第二层 ConcurrentHashMap;
第二层 ConcurrentHashMap 的 key 是服务的 InstanceId,value 是 Lease 对象;
Lease 对象包含了服务详情和服务治理相关的属性。
服务端源码 EnableEurekaClient
注解EnableEurekaClient 通过Spring boot自动配置后会生成一DiscoveryClient实例
DiscoveryClient 初始化时 拉取注册表信息,服务注册,初始化心跳机制,缓存刷新,按需注册定时任务等等。
1 |
|
拉取注册信息
1 | private boolean fetchRegistry(boolean forceFullRegistryFetch) { |
服务注册 通过eurekaTransport 调用向 eureka server 进行服务注册
1 | boolean register() throws Throwable { |
心跳机制
心跳机制的初始化工作也是在 DiscoveryClient 构造函数中完成。在DiscoveryClient构造函数的最后,有一个初始化调度任务的方法,在这个方法里就包括心跳的初始化。
创建线程池1
his.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());
//TimedSupervisorTask
TimedSupervisorTask 是 eureka 中自动调节间隔的周期性任务类。HeartbeatThread 是具体执行任何的线程,run方法中执行的就是 renew() 续期。1
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
执行renew()1
2
3
4
5
6
7
8
9
10
11private class HeartbeatThread implements Runnable {
private HeartbeatThread() {
}
public void run() {
if (DiscoveryClient.this.renew()) {
DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
renew() 方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23boolean renew() {
try {
// 通过eurekaTransport 进行调用
EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null);
logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
// 404 未发现
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
this.REREGISTER_COUNTER.increment();
logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName());
long timestamp = this.instanceInfo.setIsDirtyWithTime();
boolean success = this.register(); // 重新注册
if (success) {
this.instanceInfo.unsetIsDirty(timestamp);
}
return success;
} else {
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
} catch (Throwable var5) {
logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5);
return false;
}
}
总结
- 注册中心启动后生成一系列REST接口,ApplicationResource,注册接口InstanceResource续约接口
服务端启动后通过DiscoveryClient#register()向服务端注册
服务端通过DiscoveryClient#getAndUpdateDelta()获取列表
通过DiscoveryClient#renew()进行续约